package cn.doitedu.rtmk.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.roaringbitmap.RoaringBitmap;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;

public class UserTagsMoni {
    public void tags() throws IOException {

        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum","doitedu:2181");

        Connection conn = ConnectionFactory.createConnection(config);

        Table table = conn.getTable(TableName.valueOf("user_profile"));

        Put put1 = new Put(Bytes.toBytes(1));
        put1.addColumn("f".getBytes(),"age".getBytes(),Bytes.toBytes(35));
        put1.addColumn("f".getBytes(),"gender".getBytes(),Bytes.toBytes("male"));
        put1.addColumn("f".getBytes(),"tg03".getBytes(),Bytes.toBytes(2000));



        Put put2 = new Put(Bytes.toBytes(2));
        put2.addColumn("f".getBytes(),"age".getBytes(),Bytes.toBytes(18));
        put2.addColumn("f".getBytes(),"gender".getBytes(),Bytes.toBytes("male"));
        put2.addColumn("f".getBytes(),"tg03".getBytes(),Bytes.toBytes(150));

        ArrayList<Put> puts = new ArrayList<>();
        puts.add(put1);
        puts.add(put2);

        table.put(puts);

    }


    @Test
    public void insertRule() throws SQLException, IOException {
        String paramJson = "{\"ruleModelId\":\"rule_model_001\",\"ruleId\":\"r001_001\",\"fireEventId\":\"add_cart\",\"fileEventPropertyKey\":\"item_price\",\"fileEventPropertyValue\":100,\"ageConditionMin\":20,\"ageConditionMax\":40,\"genderCondition\":\"male\",\"tg03Condition\":1000,\"dynamicConditionValue1\":2,\"dynamicConditionvalue2\":0}";
        String paramBeanCode="package cn.doitedu.rtmk.demo.demo6;\n" +
                "\n" +
                "public class RuleModel001ParamBean {\n" +
                "    String ruleModelId;\n" +
                "    String ruleId;\n" +
                "\n" +
                "    String fireEventId;\n" +
                "    String fileEventPropertyKey;\n" +
                "    int fileEventPropertyValue;\n" +
                "\n" +
                "    int ageConditionMin;\n" +
                "    int ageConditionMax;\n" +
                "    String genderCondition;\n" +
                "    int tg03Condition;\n" +
                "\n" +
                "    int dynamicConditionValue1;\n" +
                "    int dynamicConditionvalue2;\n" +
                "\n" +
                "    public RuleModel001ParamBean() {\n" +
                "    }\n" +
                "\n" +
                "    public RuleModel001ParamBean(String ruleModelId, String ruleId, String fireEventId, String fileEventPropertyKey, int fileEventPropertyValue, int ageConditionMin, int ageConditionMax, String genderCondition, int tg03Condition, int dynamicConditionValue1, int dynamicConditionvalue2) {\n" +
                "        this.ruleModelId = ruleModelId;\n" +
                "        this.ruleId = ruleId;\n" +
                "        this.fireEventId = fireEventId;\n" +
                "        this.fileEventPropertyKey = fileEventPropertyKey;\n" +
                "        this.fileEventPropertyValue = fileEventPropertyValue;\n" +
                "        this.ageConditionMin = ageConditionMin;\n" +
                "        this.ageConditionMax = ageConditionMax;\n" +
                "        this.genderCondition = genderCondition;\n" +
                "        this.tg03Condition = tg03Condition;\n" +
                "        this.dynamicConditionValue1 = dynamicConditionValue1;\n" +
                "        this.dynamicConditionvalue2 = dynamicConditionvalue2;\n" +
                "    }\n" +
                "\n" +
                "    public String getRuleModelId() {\n" +
                "        return ruleModelId;\n" +
                "    }\n" +
                "\n" +
                "    public void setRuleModelId(String ruleModelId) {\n" +
                "        this.ruleModelId = ruleModelId;\n" +
                "    }\n" +
                "\n" +
                "    public String getRuleId() {\n" +
                "        return ruleId;\n" +
                "    }\n" +
                "\n" +
                "    public void setRuleId(String ruleId) {\n" +
                "        this.ruleId = ruleId;\n" +
                "    }\n" +
                "\n" +
                "    public String getFireEventId() {\n" +
                "        return fireEventId;\n" +
                "    }\n" +
                "\n" +
                "    public void setFireEventId(String fireEventId) {\n" +
                "        this.fireEventId = fireEventId;\n" +
                "    }\n" +
                "\n" +
                "    public String getFileEventPropertyKey() {\n" +
                "        return fileEventPropertyKey;\n" +
                "    }\n" +
                "\n" +
                "    public void setFileEventPropertyKey(String fileEventPropertyKey) {\n" +
                "        this.fileEventPropertyKey = fileEventPropertyKey;\n" +
                "    }\n" +
                "\n" +
                "    public int getFileEventPropertyValue() {\n" +
                "        return fileEventPropertyValue;\n" +
                "    }\n" +
                "\n" +
                "    public void setFileEventPropertyValue(int fileEventPropertyValue) {\n" +
                "        this.fileEventPropertyValue = fileEventPropertyValue;\n" +
                "    }\n" +
                "\n" +
                "    public int getAgeConditionMin() {\n" +
                "        return ageConditionMin;\n" +
                "    }\n" +
                "\n" +
                "    public void setAgeConditionMin(int ageConditionMin) {\n" +
                "        this.ageConditionMin = ageConditionMin;\n" +
                "    }\n" +
                "\n" +
                "    public int getAgeConditionMax() {\n" +
                "        return ageConditionMax;\n" +
                "    }\n" +
                "\n" +
                "    public void setAgeConditionMax(int ageConditionMax) {\n" +
                "        this.ageConditionMax = ageConditionMax;\n" +
                "    }\n" +
                "\n" +
                "    public String getGenderCondition() {\n" +
                "        return genderCondition;\n" +
                "    }\n" +
                "\n" +
                "    public void setGenderCondition(String genderCondition) {\n" +
                "        this.genderCondition = genderCondition;\n" +
                "    }\n" +
                "\n" +
                "    public int getTg03Condition() {\n" +
                "        return tg03Condition;\n" +
                "    }\n" +
                "\n" +
                "    public void setTg03Condition(int tg03Condition) {\n" +
                "        this.tg03Condition = tg03Condition;\n" +
                "    }\n" +
                "\n" +
                "    public int getDynamicConditionValue1() {\n" +
                "        return dynamicConditionValue1;\n" +
                "    }\n" +
                "\n" +
                "    public void setDynamicConditionValue1(int dynamicConditionValue1) {\n" +
                "        this.dynamicConditionValue1 = dynamicConditionValue1;\n" +
                "    }\n" +
                "\n" +
                "    public int getDynamicConditionvalue2() {\n" +
                "        return dynamicConditionvalue2;\n" +
                "    }\n" +
                "\n" +
                "    public void setDynamicConditionvalue2(int dynamicConditionvalue2) {\n" +
                "        this.dynamicConditionvalue2 = dynamicConditionvalue2;\n" +
                "    }\n" +
                "}";
        String caclCode = "package cn.doitedu.rtmk.demo.demo6;\n" +
                "\n" +
                "import com.alibaba.fastjson.JSON;\n" +
                "import org.apache.flink.api.common.functions.RuntimeContext;\n" +
                "import org.apache.flink.api.common.state.ListState;\n" +
                "import org.apache.flink.api.common.state.ListStateDescriptor;\n" +
                "import org.apache.flink.api.common.state.StateTtlConfig;\n" +
                "import org.apache.flink.api.common.time.Time;\n" +
                "import org.apache.flink.util.Collector;\n" +
                "import org.roaringbitmap.RoaringBitmap;\n" +
                "import org.slf4j.Logger;\n" +
                "import org.slf4j.LoggerFactory;\n" +
                "\n" +
                "import java.io.IOException;\n" +
                "\n" +
                "\n" +
                "public class RuleModel001Calculator implements RuleModelCalculator {\n" +
                "\n" +
                "    private static final Logger log  = LoggerFactory.getLogger(RuleModel001Calculator.class);\n" +
                "\n" +
                "    RuleModel001ParamBean ruleParamBean;\n" +
                "    ListState<EventBean> con1State;\n" +
                "    ListState<EventBean> con2State;\n" +
                "    RoaringBitmap btm;\n" +
                "\n" +
                "    @Override\n" +
                "    public String getRuleId() {\n" +
                "        return ruleParamBean.getRuleId();\n" +
                "    }\n" +
                "\n" +
                "    @Override\n" +
                "    public void init(String ruleParamJson, RuntimeContext runtimeContext, RoaringBitmap btm) throws IOException {\n" +
                "        this.btm = btm;\n" +
                "\n" +
                "        ruleParamBean = JSON.parseObject(ruleParamJson, RuleModel001ParamBean.class);\n" +
                "\n" +
                "        // 创建动态画像所需要的 状态\n" +
                "        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(5)).useProcessingTime().build();\n" +
                "        ListStateDescriptor<EventBean> con1StateDesc =\n" +
                "                new ListStateDescriptor<>(\"con1_state\", EventBean.class);\n" +
                "        con1StateDesc.enableTimeToLive(ttlConfig);\n" +
                "        con1State = runtimeContext.getListState(con1StateDesc);\n" +
                "\n" +
                "        // 创建动态画像所需要的 状态\n" +
                "        StateTtlConfig ttlConfig2 = StateTtlConfig.newBuilder(Time.minutes(30)).useProcessingTime().build();\n" +
                "        ListStateDescriptor<EventBean> con2StateDesc =\n" +
                "                new ListStateDescriptor<>(\"con2_state\", EventBean.class);\n" +
                "        con1StateDesc.enableTimeToLive(ttlConfig2);\n" +
                "        con2State = runtimeContext.getListState(con2StateDesc);\n" +
                "\n" +
                "    }\n" +
                "\n" +
                "\n" +
                "    @Override\n" +
                "    public void calc(\n" +
                "            EventBean eventBean,\n" +
                "            Collector<RtmkMessage> out\n" +
                "    ) throws Exception {\n" +
                "        // 进行规则所需要的实时画像标签的计算\n" +
                "        // 实时画像条件：  item_share 发生次数>=2 ;  item_like 发生次数>=1\n" +
                "        if (eventBean.getEventId().equals(\"item_share\")) con1State.add(eventBean);\n" +
                "        if (eventBean.getEventId().equals(\"item_like\")) con2State.add(eventBean);\n" +
                "\n" +
                "        // 判断是否是预定义的营销规则的触发事件 ：加购事件（加购的商品价格>100)\n" +
                "        if (eventBean.getEventId().equals(ruleParamBean.getFireEventId()) && Double.parseDouble(eventBean.getProperties().get(ruleParamBean.getFileEventPropertyKey())) > ruleParamBean.getFileEventPropertyValue()) {\n" +
                "            log.warn(\"用户:{},触发了规则：{}\" ,eventBean.getUserId(),ruleParamBean.getRuleId());\n" +
                "\n" +
                "            // 判断该事件的行为人是否满足营销规则中定义的判断条件\n" +
                "            // 离线静态画像条件：直接在bitmap人群中比对就可以了\n" +
                "\n" +
                "            if (btm.contains(eventBean.getUserId())) {\n" +
                "                //  实时动态画像条件：\n" +
                "                //  最近 5分钟发生过 ： 分享行为 2次以上\n" +
                "                //  最近 30分钟发生过 ：点赞行为 1次以上\n" +
                "                int con1Count = 0;\n" +
                "                for (EventBean bean : con1State.get()) {\n" +
                "                    con1Count++;\n" +
                "                }\n" +
                "                if (con1Count > ruleParamBean.getDynamicConditionValue1()) {\n" +
                "                    int con2Count = 0;\n" +
                "                    for (EventBean bean : con2State.get()) {\n" +
                "                        con2Count++;\n" +
                "                    }\n" +
                "                    if (con2Count > ruleParamBean.getDynamicConditionvalue2()) {\n" +
                "                        // 走到这里，就所有的条件（包含离线静态画像标签条件和 实时动态画像标签条件）都满足了\n" +
                "                        log.warn(\"用户: {},时间: {} ,命中了规则: {}\",eventBean.getUserId(),eventBean.getTimeStamp(),ruleParamBean.getRuleId());\n" +
                "                        out.collect(new RtmkMessage(eventBean.getUserId(), eventBean.getTimeStamp(), ruleParamBean.ruleId));\n" +
                "                    }\n" +
                "                }\n" +
                "            }\n" +
                "        }\n" +
                "    }\n" +
                "}\n" +
                "\n" +
                "\n" +
                "\n" +
                "\n" +
                "\n" +
                "\n" +
                "\n" +
                "\n" +
                "\n" +
                "\n" +
                "\n" +
                "\n";


        java.sql.Connection conn = DriverManager.getConnection("jdbc:mysql://doitedu:3306/doit35", "root", "root");
        PreparedStatement stmt = conn.prepareStatement("insert into rule_res_v2 values(?,?,?,?,?,?,?,?,?,?)");

        stmt.setString(1,"r001_001");
        stmt.setString(2,"rule_model_001");
        stmt.setString(3,paramJson);
        stmt.setString(4,paramBeanCode);
        stmt.setString(5,caclCode);
        stmt.setInt(6,1);


        RoaringBitmap btm = RoaringBitmap.bitmapOf(1, 2, 3, 4);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        DataOutputStream dataOut = new DataOutputStream(bout);
        btm.serialize(dataOut);
        byte[] btmBytes = bout.toByteArray();

        stmt.setBytes(7,btmBytes);
        stmt.setString(8,"deepAsTheSeaMan");
        stmt.setTimestamp(9,new Timestamp(System.currentTimeMillis()));
        stmt.setTimestamp(10,new Timestamp(System.currentTimeMillis()));

        stmt.execute();

    }

}
